/***
 * Copyright (c) 2021-2031 murenchao
 * fig is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.software.fig.mqtt.broker.impl

import cool.taomu.software.fig.mqtt.broker.MQTTCache
import cool.taomu.software.fig.mqtt.broker.entity.MessageEntity
import cool.taomu.software.fig.mqtt.broker.entity.PublishEntity
import cool.taomu.software.fig.mqtt.broker.entity.TopicEntity
import cool.taomu.software.fig.mqtt.broker.inter.IPublishObserver
import cool.taomu.software.fig.mqtt.broker.inter.IResponse
import cool.taomu.software.fig.mqtt.utils.CommonUtils
import io.netty.handler.codec.mqtt.MqttQoS
import java.util.List
import java.util.concurrent.atomic.AtomicInteger
import org.apache.commons.lang3.SerializationUtils
import org.apache.oro.text.perl.Perl5Util
import org.eclipse.xtend.lib.annotations.Accessors
import org.junit.Assert
import org.slf4j.LoggerFactory

@Accessors
class Publish implements IPublishObserver {
    val LOG = LoggerFactory.getLogger(Publish);
    MQTTCache cache = MQTTCache.instance;
    TopicEntity topic;
    IResponse<PublishEntity> reponse;
    static AtomicInteger count = new AtomicInteger(0);
    int number = 0;

    new(TopicEntity topic, IResponse<PublishEntity> reponse) {
        count.incrementAndGet();
        this.number = count.intValue;
        this.topic = topic;
        this.reponse = reponse;
    }

    override update(List<MessageEntity> messages, Type type) {
        publishMessage(messages, type)
    }

    private def void publishMessage(List<MessageEntity> messages, Type type) {
        try {
            messages.filterNull.forEach [ msg |
                var p5 = new Perl5Util();
                var subTopicName = topic.name.replace("/+", "/[a-zA-Z]?[a-zA-Z0-9]+").replace("/#",
                    "/[a-zA-Z]?([a-zA-Z0-9/]*)").replace("/", "\\/");
                LOG.info("订阅者id : {},  Topic : {}", topic.clientId, subTopicName);
                if (p5.match("/" + subTopicName + "/", msg.topic)) {
                    LOG.info("匹配到订阅 ： {}", subTopicName);
                    // 推送匹配上的消息
                    var qos = msg.qos.value;
                    var minQos = CommonUtils.getQos(qos, topic.qos);
                    if (minQos == 2) {
                        // 记录QoS2信息
                        var cloneMsg = SerializationUtils.clone(msg);
                        cloneMsg.senderChannel = msg.senderChannel;
                        cache.storeQoS2Message(topic.clientId, cloneMsg);
                    }
                    var clientSession = cache.loadSession(topic.clientId);
                    Assert.assertNotNull(clientSession);
                    Assert.assertNotNull(topic);
                    Assert.assertNotNull(topic.name);
                    Assert.assertNotNull(msg);
                    LOG.info("订阅者id : {},  Topic : {}, 发送 ： {}", topic.clientId, subTopicName, new String(msg.payload))
                    var entity = new PublishEntity(MqttQoS.valueOf(minQos), topic.name,
                        clientSession.generateMessageId as Integer, msg.payload, false);
                    clientSession.ctx.writeAndFlush(reponse.response(entity));
                }
            ]
        } catch (Exception ex) {
            LOG.debug("publishMessage 方法出现错误 : ", ex);
        }
    }

}
